/**
 * Copyright (C), 2017-2018, XXX有限公司
 * FileName: NettyServer
 * Author:   zengjian
 * Date:     2018/10/8 17:16
 * Description:
 * History:
 * <author>          <time>          <version>          <desc>
 * 作者姓名           修改时间           版本号              描述
 */
package org.yinxue.framework.rpc.tansport;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.MessageToMessageDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yinxue.framework.rpc.RpcRequest;
import org.yinxue.framework.rpc.RpcResponse;
import org.yinxue.framework.rpc.register.LocalRegister;
import org.yinxue.framework.rpc.register.ServiceExporter;


import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * 〈NettyServer〉<br>
 * 〈一句话描述〉
 *
 * @author zengjian
 * @create 2018/10/8 17:16
 */
public class NettyServer {

    private static final Logger LOGGER = LoggerFactory.getLogger(NettyServer.class);

    public static final String DEFAULT_HOST = "localhost";
    public static final int DEFAULT_PORT = 8080;

    private LocalRegister localRegister;
    private AtomicBoolean isStarted = new AtomicBoolean(false);
    private EventLoopGroup boss;
    private EventLoopGroup worker;
    private ServerBootstrap serverBootstrap;
    private ChannelFuture channelFuture;

    public NettyServer(LocalRegister localRegister) throws InterruptedException {
        this.localRegister = localRegister;
    }

    public void start(String host, int port) {
        if (!isStarted.compareAndSet(false, true)) {
            LOGGER.info("Netty is started, No need to start again !");
            return;
        }
        try {
            boss = new NioEventLoopGroup();
            worker = new NioEventLoopGroup();
            serverBootstrap = new ServerBootstrap().group(boss, worker)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new RpcInitHandler());
            channelFuture = serverBootstrap.bind(host, port).sync();
            LOGGER.info("NettyServer启动监听，ip地址:{}，端口号:{}", host, port);
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            LOGGER.error("NettyServer启动异常，ip地址:{}，端口号:{}", host, port);
            Thread.currentThread().interrupt();
        }
    }

    public void start() {
        this.start(DEFAULT_HOST, DEFAULT_PORT);
    }

    public void stop() {
        isStarted.compareAndSet(true, false);
        boss.shutdownGracefully();
        worker.shutdownGracefully();
    }

    private class RpcInitHandler extends ChannelInitializer<SocketChannel> {

        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast("RpcRequestReadChildHandler",new RpcRequestReadChildHandler());
            pipeline.addLast("RpcChildHandler", new RpcChildHandler());
        }
    }

    private class RpcRequestReadChildHandler extends MessageToMessageDecoder<ByteBuf> {

        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
            ByteBuf buf = (ByteBuf) msg;
            byte[] req = new byte[buf.readableBytes()];
            buf.readBytes(req);
            out.add((RpcRequest) HessianDecoder.INSTANCE.decode(req));
        }
    }

    private class RpcChildHandler extends ChannelInboundHandlerAdapter {

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            LOGGER.debug("NettyServer读取事件触发");
            if (msg instanceof RpcRequest){
                RpcRequest rpcRequest = (RpcRequest) msg;
                LOGGER.info("NettyServer接收到请求:{}", rpcRequest);
                ServiceExporter serviceExporter = localRegister.getServiceExporter(rpcRequest.getServiceName());
                if (serviceExporter != null) {
                    RpcResponse rpcResponse = serviceExporter.invoke(rpcRequest.getArgs());
                    rpcResponse.setRequestId(rpcRequest.getRequestId());
                    LOGGER.info("NettyServer处理结果:{}", rpcResponse);
                    ctx.writeAndFlush(Unpooled.wrappedBuffer(HessianEncoder.INSTANCE.encode(rpcResponse)));
                } else {
                    LOGGER.error("NettyServer未找到对应服务:{}", rpcRequest);
                    RpcResponse failRpcReponse = new RpcResponse();
                    failRpcReponse.setErrorMessage("NettyServer未找到对应服务");
                    failRpcReponse.setRequestId(rpcRequest.getRequestId());
                    ctx.writeAndFlush(Unpooled.wrappedBuffer(HessianEncoder.INSTANCE.encode(failRpcReponse)));
                }
            }
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            LOGGER.error("RpcChildHandler发生异常", cause);
            super.exceptionCaught(ctx, cause);
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new NettyServer(new LocalRegister()).start();
    }
}